package com.ruoyi.flink.source;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * @program: ruoyi
 * @description: AsyncDatabaseRequest
 * @author: zengchen
 * @create: 2020-12-04 10:53
 **/
public class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple4<String,Integer, String, String>> {

    /**
     * The database specific client that can issue concurrent requests with callbacks
     */
//        private transient DatabaseClient client;
    private transient Connection conn = null;
//    private transient String url = ;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.cj.jdbc.Driver"); // 加载驱动
        conn = DriverManager.getConnection("jdbc:mysql://106.13.54.74:3306/user?useSSL=false&useUnicode=true&characterEncoding=utf8",
                "root","Ndsju2dsankD2Sdsdf754nSas8jfw");
    }

    @Override
    public void close() throws Exception {
        conn.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple4<String, Integer, String, String>> resultFuture) throws Exception {

        String sql = "select * from member where id = " + key;
        Statement statement = conn.createStatement();
        ResultSet rs = statement.executeQuery(sql);

        List<Tuple4<String,Integer, String, String>> list = new ArrayList<>();
        if (rs != null && rs.next()) {
            int id = rs.getInt("id");
            String wxId = rs.getString("wx_id");
            String nickname = rs.getString("nickname");
            Tuple4<String,Integer, String, String> res = Tuple4.of(key,id, wxId, nickname);
            list.add(res);
        }

        // 将数据搜集
        resultFuture.complete(list);
//        rs.close();
//        statement.close();

    }

    public static void main(String[] args) {
        Connection conn = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://106.13.54.74:3306/user?useSSL=false&useUnicode=true&characterEncoding=utf8","root","Ndsju2dsankD2Sdsdf754nSas8jfw");
            Statement statement = conn.createStatement();
            ResultSet resultSet = statement.executeQuery("select * from member");
            while (resultSet.next()){
                int id = resultSet.getInt("id");
                String wxId = resultSet.getString("wx_id");
                String nickname = resultSet.getString("nickname");
                System.out.println(id + " " + wxId + " " + nickname);
            }
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
    }
}
